package main

import (
	"encoding/base64"
	"fmt"
	"github.com/gorilla/mux"
	audio_transcoder "github.com/lkmio/audio-transcoder"
	"github.com/lkmio/avformat/bufio"
	"github.com/lkmio/lkm/gb28181"
	"github.com/lkmio/lkm/log"
	"github.com/lkmio/lkm/stream"
	"net"
	"net/http"
	"strconv"
	"time"
)

const (
	InviteTypePlay      = "play"
	InviteTypePlayback  = "playback"
	InviteTypeDownload  = "download"
	InviteTypeBroadcast = "broadcast"
	InviteTypeTalk      = "talk"
)

type SDP struct {
	SessionName string  `json:"session_name,omitempty"` // play/download/playback/talk/broadcast
	Addr        string  `json:"addr,omitempty"`         // 连接地址
	SSRC        string  `json:"ssrc,omitempty"`
	Setup       string  `json:"setup,omitempty"`     // active/passive
	Transport   string  `json:"transport,omitempty"` // tcp/udp
	Speed       float64 `json:"speed,omitempty"`
	StartTime   int     `json:"start_time,omitempty"`
	EndTime     int     `json:"end_time,omitempty"`
	FileSize    int     `json:"file_size,omitempty"`
}

type DownloadInfo struct {
	PlaybackDuration  int     // 回放/下载时长
	PlaybackSpeed     float64 // 回放/下载速度
	PlaybackFileURL   string  // 回放/下载文件URL
	PlaybackStartTime string  // 回放/下载开始时间
	PlaybackEndTime   string  // 回放/下载结束时间
	PlaybackFileSize  int     // 回放/下载文件大小
	PlaybackProgress  float64 // 1-下载完成
	Progress          float64
}

type SourceSDP struct {
	Source string `json:"source"` // GetSourceID
	SDP
}

type GBOffer struct {
	SourceSDP
	AnswerSetup         string                     `json:"answer_setup,omitempty"` // 希望应答的连接方式
	TransStreamProtocol stream.TransStreamProtocol `json:"trans_stream_protocol,omitempty"`
}

func Source2GBSource(source stream.Source) gb28181.GBSource {
	if gbSource, ok := source.(*gb28181.PassiveSource); ok {
		return gbSource
	} else if gbSource, ok := source.(*gb28181.ActiveSource); ok {
		return gbSource
	} else if gbSource, ok := source.(*gb28181.PassiveSource); ok {
		return gbSource
	}

	return nil
}

func (api *ApiServer) OnGBSourceCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
	log.Sugar.Infof("创建国标源: %v", v)

	// 返回收流地址
	response := &struct {
		SDP
		Urls []string `json:"urls"`
	}{}

	var err error
	// 响应错误消息
	defer func() {
		if err != nil {
			log.Sugar.Errorf("创建国标源失败 err: %s", err.Error())
			httpResponseError(w, err.Error())
		}
	}()

	source := stream.SourceManager.Find(v.Source)
	if source != nil {
		err = fmt.Errorf("%s 源已经存在", v.Source)
		return
	}

	tcp := true
	var active bool
	if v.Setup == "passive" {
	} else if v.Setup == "active" {
		active = true
	} else {
		tcp = false
		//udp收流
	}

	var ssrc string
	if InviteTypeDownload == v.SessionName || InviteTypePlayback == v.SessionName {
		ssrc = gb28181.GetVodSSRC()
	} else {
		ssrc = gb28181.GetLiveSSRC()
	}

	ssrcValue, _ := strconv.Atoi(ssrc)
	gbSource, port, err := gb28181.NewGBSource(v.Source, uint32(ssrcValue), tcp, active)
	if err != nil {
		return
	} else if InviteTypeDownload == v.SessionName {
		// 开启录制
		gbSource.GetTransStreamPublisher().StartRecord()
	}

	startTime := time.Unix(int64(v.StartTime), 0).Format("2006-01-02T15:04:05")
	endTime := time.Unix(int64(v.EndTime), 0).Format("2006-01-02T15:04:05")
	gbSource.SetSessionName(v.SessionName)
	gbSource.SetStartTime(startTime)
	gbSource.SetEndTime(endTime)
	gbSource.SetSpeed(v.Speed)
	gbSource.SetDuration(v.EndTime - v.StartTime)

	response.Addr = net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port))
	response.Urls = stream.GetStreamPlayUrls(v.Source)
	response.SSRC = ssrc

	log.Sugar.Infof("创建国标源成功, addr: %s, ssrc: %d", response.Addr, ssrcValue)
	httpResponseOK(w, response)
}

func (api *ApiServer) OnGBSourceConnect(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
	log.Sugar.Infof("设置国标应答: %v", v)

	var err error
	// 响应错误消息
	defer func() {
		if err != nil {
			log.Sugar.Errorf("设置国标应答失败 err: %s", err.Error())
			httpResponseError(w, err.Error())
		}
	}()

	source := stream.SourceManager.Find(v.Source)
	if source == nil {
		err = fmt.Errorf("%s 源不存在", v.Source)
	} else if stream.SourceType28181 != source.GetType() {
		err = fmt.Errorf("%s 源不是28181类型", v.Source)
	} else if activeSource, ok := source.(*gb28181.ActiveSource); ok {
		activeSource.SetFileSize(v.FileSize)
		// 主动连接取流
		var addr *net.TCPAddr
		addr, err = net.ResolveTCPAddr("tcp", v.Addr)
		if err != nil {
			return
		}

		if err = activeSource.Connect(addr); err == nil {
			httpResponseOK(w, nil)
		}
	} else if passiveSource, ok := source.(*gb28181.PassiveSource); ok {
		passiveSource.SetFileSize(v.FileSize)
	} else if udpSource, ok := source.(*gb28181.UDPSource); ok {
		udpSource.SetFileSize(v.FileSize)
	}
}

func (api *ApiServer) OnGBOfferCreate(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
	// 预览下级设备
	if v.SessionName == "" || v.SessionName == InviteTypePlay ||
		v.SessionName == InviteTypePlayback ||
		v.SessionName == InviteTypeDownload {
		api.OnGBSourceCreate(v, w, r)
	} else {
		// 向上级转发广播和对讲, 或者是向设备发送invite talk
	}
}

func (api *ApiServer) AddForwardSink(protocol stream.TransStreamProtocol, transport stream.TransportType, sourceId string, remoteAddr string, ssrc, sessionName string, w http.ResponseWriter, r *http.Request) {
	// 解析或生成应答的ssrc
	var ssrcOffer int
	var ssrcAnswer string
	if ssrc != "" {
		var err error
		ssrcOffer, err = strconv.Atoi(ssrc)
		if err != nil {
			log.Sugar.Errorf("解析ssrc失败 err: %s ssrc: %s", err.Error(), ssrc)
		} else {
			ssrcAnswer = ssrc
		}
	}

	if ssrcAnswer == "" {
		if "download" != sessionName && "playback" != sessionName {
			ssrcAnswer = gb28181.GetLiveSSRC()
		} else {
			ssrcAnswer = gb28181.GetVodSSRC()
		}

		var err error
		ssrcOffer, err = strconv.Atoi(ssrcAnswer)
		// 严重错误, 直接panic
		if err != nil {
			panic(err)
		}
	}

	var port int
	sink, port, err := stream.ForwardStream(protocol, transport, sourceId, r.URL.Query(), remoteAddr, gb28181.TransportManger, uint32(ssrcOffer))
	if err != nil {
		log.Sugar.Errorf("创建转发sink失败 err: %s", err.Error())
		httpResponseError(w, err.Error())
		return
	}

	log.Sugar.Infof("创建转发sink成功, sink: %s port: %d transport: %s ssrc: %s", sink.GetID(), port, transport, ssrcAnswer)

	response := struct {
		Sink string `json:"sink"` // sink id
		SDP
	}{Sink: stream.SinkID2String(sink.GetID()), SDP: SDP{Addr: net.JoinHostPort(stream.AppConfig.PublicIP, strconv.Itoa(port)), SSRC: ssrcAnswer}}

	httpResponseOK(w, &response)
}

func (api *ApiServer) OnSinkAdd(v *GBOffer, w http.ResponseWriter, r *http.Request) {
	log.Sugar.Infof("添加sink: %v", *v)
	if stream.TransStreamGBCascaded != v.TransStreamProtocol && stream.TransStreamGBTalk != v.TransStreamProtocol && stream.TransStreamGBGateway != v.TransStreamProtocol {
		httpResponseError(w, "不支持的协议")
		return
	}

	setup := gb28181.SetupTypeFromString(v.Setup)
	if v.AnswerSetup != "" {
		setup = gb28181.SetupTypeFromString(v.AnswerSetup)
	}

	api.AddForwardSink(v.TransStreamProtocol, setup.TransportType(), v.Source, v.Addr, v.SSRC, v.SessionName, w, r)
}

// OnGBTalk 国标广播/对讲流程:
// 1. 浏览器使用WS携带source_id访问/api/v1/gb28181/talk, 如果source_id冲突, 直接断开ws连接
// 2. WS链接建立后, 调用gb-cms接口/api/v1/broadcast/invite, 向设备发送广播请求
func (api *ApiServer) OnGBTalk(w http.ResponseWriter, r *http.Request) {
	conn, err := api.upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Sugar.Errorf("升级为websocket失败 err: %s", err.Error())
		conn.Close()
		return
	}

	// 获取id
	id := r.FormValue("source")

	talkSource := gb28181.NewTalkSource(id, conn)
	talkSource.Init()
	talkSource.SetUrlValues(r.Form)

	_, err = stream.PreparePublishSource(talkSource, true)
	if err != nil {
		log.Sugar.Errorf("对讲失败, err: %s source: %s", err, talkSource)
		conn.Close()
		return
	}

	log.Sugar.Infof("ws对讲连接成功, source: %s", talkSource)

	stream.LoopEvent(talkSource)

	data := stream.UDPReceiveBufferPool.Get().([]byte)

	for {
		_, bytes, err := conn.ReadMessage()
		length := len(bytes)
		if err != nil {
			log.Sugar.Errorf("读取对讲音频包失败, source: %s err: %s", id, err.Error())
			break
		} else if length < 1 {
			continue
		}

		for i := 0; i < length; {
			n := bufio.MinInt(stream.UDPReceiveBufferSize, length-i)
			copy(data, bytes[:n])
			_, _ = talkSource.PublishSource.Input(data[:n])
			i += n
		}
	}

	talkSource.Close()
}

// OnLiveGBSTalk liveGBS前端对讲
func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	device := vars["device"]
	channel := vars["channel"]
	_ = r.URL.Query().Get("format")

	conn, err := api.upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Sugar.Errorf("升级为websocket失败 err: %s", err.Error())
		conn.Close()
		return
	}

	id := device + "/" + channel + ".broadcast"
	talkSource := gb28181.NewTalkSource(id, conn)
	talkSource.Init()
	talkSource.SetUrlValues(r.Form)

	_, err = stream.PreparePublishSource(talkSource, true)
	if err != nil {
		log.Sugar.Errorf("对讲失败, err: %s source: %s", err, talkSource)
		conn.Close()
		return
	}

	log.Sugar.Infof("ws对讲连接成功, source: %s", talkSource)

	stream.LoopEvent(talkSource)

	data := stream.UDPReceiveBufferPool.Get().([]byte)
	pcm := make([]byte, 32000)
	g711aPacket := make([]byte, stream.UDPReceiveBufferSize/2)

	for {
		_, bytes, err := conn.ReadMessage()
		length := len(bytes)
		if err != nil {
			log.Sugar.Errorf("读取对讲音频包失败, source: %s err: %s", id, err.Error())
			break
		} else if length < 1 {
			continue
		}

		// 扩容
		if int(float64(len(bytes))*1.4) > len(pcm) {
			pcm = make([]byte, len(bytes)*2)
		}

		// base64解密
		var pcmN int
		pcmN, err = base64.StdEncoding.Decode(pcm, bytes)
		if err != nil {
			log.Sugar.Errorf("base64解密失败, source: %s err: %s", id, err.Error())
			continue
		}

		for i := 0; i < pcmN; {
			// 控制每包大小
			n := bufio.MinInt(stream.UDPReceiveBufferSize, length-i)
			copy(data, pcm[:n])

			// 编码成G711A
			audio_transcoder.EncodeAlawToBuffer(data, g711aPacket)

			_, _ = talkSource.PublishSource.Input(g711aPacket[:n/2])
			i += n
		}
	}

	talkSource.Close()
}

func (api *ApiServer) OnGBSpeedSet(v *SourceSDP, w http.ResponseWriter, r *http.Request) {
	source := stream.SourceManager.Find(v.Source)
	if source == nil {
		w.WriteHeader(http.StatusBadRequest)
		httpResponseError(w, "stream not found")
	} else if stream.SourceType28181 != source.GetType() {
		w.WriteHeader(http.StatusBadRequest)
		httpResponseError(w, "stream type not support")
	} else if gbSource := Source2GBSource(source); gbSource != nil {
		gbSource.SetSpeed(v.Speed)
	}
}
